/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client;

import java.util.UUID;

import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.slf4j.Logger;

import org.apache.qpid.QpidException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;

public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{


    enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };

    private final Logger _logger ;

    private AMQConnection _connection;

    private boolean _disableTimestamps;

    /**
     * Priority of messages created by this producer.
     */
    private int _messagePriority = Message.DEFAULT_PRIORITY;

    /**
     * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
     */
    private long _timeToLive;

    /**
     * Delivery mode used for this producer.
     */
    private int _deliveryMode = DeliveryMode.PERSISTENT;

    private AMQDestination _destination;

    /**
     * True if this producer was created from a transacted session
     */
    private boolean _transacted;

    private int _channelId;

    /**
     * This is an id generated by the session and is used to tie individual producers to the session. This means we
     * can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers
     * to the session so that when an error is propagated to the session it can close the producer (meaning that
     * a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
     */
    private long _producerId;

    private AMQSession _session;

    private final boolean _immediate;

    private final Boolean _mandatory;

    private boolean _disableMessageId;

    private UUIDGen _messageIdGenerator = UUIDs.newGenerator();

    private String _userID;  // ref user id used in the connection.

    private long _deliveryDelay;


    /**
     * The default value for immediate flag used this producer is false. That is, a consumer does
     * not need to be attached to a queue.
     */
    private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));

    /**
     * The default value for mandatory flag used by this producer is true. That is, server will not
     * silently drop messages where no queue is connected to the exchange for the message.
     */
    private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));

    /**
     * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server
     * will silently drop messages where no queue is connected to the exchange for the message.
     */
    private final boolean _defaultMandatoryTopicValue =
            Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic",
                    System.getProperties().containsKey("qpid.default_mandatory")
                            ? System.getProperty("qpid.default_mandatory")
                            : "false"));

    private PublishMode _publishMode = PublishMode.ASYNC_PUBLISH_ALL;

    protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
                                   AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws
                                                                                                              QpidException
    {
    	_logger = logger;
    	_connection = connection;
        _destination = destination;
        _transacted = transacted;
        _channelId = channelId;
        _session = session;
        _producerId = producerId;
        if (destination != null  && !(destination.neverDeclare()))
        {
            declareDestination(destination);
        }

        _immediate = immediate == null ? _defaultImmediateValue : immediate;
        _mandatory = mandatory == null
                ? destination == null ? null
                                      : destination instanceof Topic
                                            ? _defaultMandatoryTopicValue
                                            : _defaultMandatoryValue
                : mandatory;

        _userID = connection.isPopulateUserId() ? connection.getUsername() : null;

        if(destination != null && destination.getDeliveryDelay() != 0L)
        {
            _deliveryDelay = destination.getDeliveryDelay();
        }
        setPublishMode();
    }

    protected AMQConnection getConnection()
    {
        return _connection;
    }

    void setPublishMode()
    {
        // Publish mode could be configured at destination level as well.
        // Will add support for this when we provide a more robust binding URL

        String syncPub = _connection.getSyncPublish();
        // Support for deprecated option sync_persistence
        if (syncPub.equals("persistent") || _connection.getSyncPersistence())
        {
            _publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
        }
        else if (syncPub.equals("all"))
        {
            _publishMode = PublishMode.SYNC_PUBLISH_ALL;
        }

        if (_logger.isDebugEnabled())
        {
        	_logger.debug("MessageProducer " + toString() + " using publish mode : " + _publishMode);
        }
    }

    void resubscribe() throws QpidException
    {
        if (_destination != null && !_destination.neverDeclare())
        {
            declareDestination(_destination);
        }
    }

    abstract void declareDestination(AMQDestination destination) throws QpidException;

    public void setDisableMessageID(boolean b) throws JMSException
    {
        checkPreConditions();
        checkNotClosed();
        _disableMessageId = b;
    }

    public boolean getDisableMessageID() throws JMSException
    {
        checkNotClosed();

        return _disableMessageId;
    }

    public void setDisableMessageTimestamp(boolean b) throws JMSException
    {
        checkPreConditions();
        _disableTimestamps = b;
    }

    public boolean getDisableMessageTimestamp() throws JMSException
    {
        checkNotClosed();

        return _disableTimestamps;
    }

    public void setDeliveryMode(int i) throws JMSException
    {
        checkPreConditions();
        if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
        {
            throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
                                   + " is illegal");
        }

        _deliveryMode = i;
    }

    public int getDeliveryMode() throws JMSException
    {
        checkNotClosed();

        return _deliveryMode;
    }

    public void setPriority(int i) throws JMSException
    {
        checkPreConditions();
        if ((i < 0) || (i > 9))
        {
            throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
        }

        _messagePriority = i;
    }

    public int getPriority() throws JMSException
    {
        checkNotClosed();

        return _messagePriority;
    }

    public void setTimeToLive(long l) throws JMSException
    {
        checkPreConditions();
        if (l < 0)
        {
            throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
        }

        _timeToLive = l;
    }

    public long getTimeToLive() throws JMSException
    {
        checkNotClosed();

        return _timeToLive;
    }

    protected AMQDestination getAMQDestination()
    {
        return _destination;
    }

    /**
     * The Destination used for this consumer, if specified upon creation.
     */
    public Destination getDestination() throws JMSException
    {
        checkNotClosed();

        return _destination;
    }

    public void close() throws JMSException
    {
        setClosed();
        _session.deregisterProducer(_producerId);
        AMQDestination dest = getAMQDestination();
        AMQSession ssn = getSession();
        if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
        {
            try
            {
                if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
                    dest.getDelete() == AMQDestination.AddressOption.SENDER )
                {
                    ssn.handleNodeDelete(dest);
                }
                ssn.handleLinkDelete(dest);
            }
            catch(TransportException e)
            {
                throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
            }
            catch (QpidException e)
            {
                throw JMSExceptionHelper.chainJMSException(new JMSException("Exception while closing producer:"
                                                                            + e.getMessage()), e);
            }
        }
    }

    public void send(Message message) throws JMSException
    {
        checkPreConditions();
        checkInitialDestination();

        synchronized (_connection.getFailoverMutex())
        {
            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
                     _deliveryDelay);
        }
    }

    public void send(Message message, int deliveryMode) throws JMSException
    {
        checkPreConditions();
        checkInitialDestination();

        synchronized (_connection.getFailoverMutex())
        {
            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
                     _deliveryDelay);
        }
    }

    public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
    {
        checkPreConditions();
        checkInitialDestination();
        synchronized (_connection.getFailoverMutex())
        {
            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate,
                     _deliveryDelay);
        }
    }

    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
    {
        checkPreConditions();
        checkInitialDestination();
        synchronized (_connection.getFailoverMutex())
        {
            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate, _deliveryDelay);
        }
    }

    public void send(Destination destination, Message message) throws JMSException
    {
        checkPreConditions();
        checkDestination(destination);
        synchronized (_connection.getFailoverMutex())
        {
            validateDestination(destination);
            AMQDestination amqDestination = (AMQDestination) destination;
            sendImpl(amqDestination, message, _deliveryMode, _messagePriority, _timeToLive,
                    _mandatory == null
                            ? destination instanceof Topic
                                ? _defaultMandatoryTopicValue
                                : _defaultMandatoryValue
                            : _mandatory,
                     _immediate,
                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
        }
    }

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
        throws JMSException
    {
        checkPreConditions();
        checkDestination(destination);
        synchronized (_connection.getFailoverMutex())
        {
            validateDestination(destination);
            AMQDestination amqDestination = (AMQDestination) destination;
            sendImpl(amqDestination, message, deliveryMode, priority, timeToLive,
                    _mandatory == null
                            ? destination instanceof Topic
                                ? _defaultMandatoryTopicValue
                                : _defaultMandatoryValue
                            : _mandatory,
                     _immediate,
                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
        }
    }

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
                     boolean mandatory) throws JMSException
    {
        checkPreConditions();
        checkDestination(destination);
        synchronized (_connection.getFailoverMutex())
        {
            validateDestination(destination);
            AMQDestination amqDestination = (AMQDestination) destination;
            sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, _immediate,
                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
        }
    }

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
                     boolean mandatory, boolean immediate) throws JMSException
    {
        checkPreConditions();
        checkDestination(destination);
        synchronized (_connection.getFailoverMutex())
        {
            validateDestination(destination);
            AMQDestination amqDestination = (AMQDestination) destination;
            sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
        }
    }

    private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
    {
        if (message instanceof AbstractJMSMessage)
        {
            return (AbstractJMSMessage) message;
        }
        else
        {
            AbstractJMSMessage newMessage;

            if (message instanceof BytesMessage)
            {
                newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
            }
            else if (message instanceof MapMessage)
            {
                newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
            }
            else if (message instanceof ObjectMessage)
            {
                newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
            }
            else if (message instanceof TextMessage)
            {
                newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
            }
            else if (message instanceof StreamMessage)
            {
                newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
            }
            else
            {
                newMessage = new MessageConverter(_session, message).getConvertedMessage();
            }

            if (newMessage != null)
            {
                return newMessage;
            }
            else
            {
                throw new JMSException("Unable to send message, due to class conversion error: "
                                       + message.getClass().getName());
            }
        }
    }

    private void validateDestination(Destination destination) throws JMSException
    {
        if (!(destination instanceof AMQDestination))
        {
            throw new InvalidDestinationException("Unsupported destination class: "
                                   + ((destination != null) ? destination.getClass() : null));
        }

        AMQDestination amqDestination = (AMQDestination) destination;
        if (!_session.isResolved(amqDestination))
        {
            try
            {
                declareDestination(amqDestination);
            }
            catch(Exception e)
            {
                throw JMSExceptionHelper.chainJMSException(new InvalidDestinationException(
                        "Error validating destination"), e);
            }
        }
    }

    /**
     * The caller of this method must hold the failover mutex.
     *
     * @param destination
     * @param origMessage
     * @param deliveryMode
     * @param priority
     * @param timeToLive
     * @param mandatory
     * @param immediate
     *
     * @param deliveryDelay
     * @throws JMSException
     */
    protected void sendImpl(AMQDestination destination,
                            Message origMessage,
                            int deliveryMode,
                            int priority,
                            long timeToLive,
                            boolean mandatory,
                            boolean immediate,
                            long deliveryDelay) throws JMSException
    {
        checkTemporaryDestination(destination);
        origMessage.setJMSDestination(destination);

        AbstractJMSMessage message = convertToNativeMessage(origMessage);

        UUID messageId = null;
        if (_disableMessageId)
        {
            message.setJMSMessageID((UUID)null);
        }
        else
        {
            messageId = _messageIdGenerator.generate();
            message.setJMSMessageID(messageId);
        }

        try
        {
            sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate,
                        deliveryDelay);
        }
        catch (TransportException e)
        {
            throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
        }

        if (message != origMessage)
        {
            _logger.debug("Updating original message");
            origMessage.setJMSPriority(message.getJMSPriority());
            origMessage.setJMSTimestamp(message.getJMSTimestamp());
            if (_logger.isDebugEnabled())
            {
            	_logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
            }
            origMessage.setJMSExpiration(message.getJMSExpiration());
            origMessage.setJMSMessageID(message.getJMSMessageID());
        }

        if (_transacted)
        {
            _session.markDirty();
        }
    }

    abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                              UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
                              boolean immediate, final long deliveryDelay) throws JMSException;

    private void checkTemporaryDestination(AMQDestination destination) throws InvalidDestinationException
    {
        if (destination instanceof TemporaryDestination)
        {
            _logger.debug("destination is temporary destination");
            TemporaryDestination tempDest = (TemporaryDestination) destination;
            if (tempDest.getSession().isClosed())
            {
                _logger.debug("session is closed");
                throw new InvalidDestinationException("Session for temporary destination has been closed");
            }

            if (tempDest.isDeleted())
            {
                _logger.debug("destination is deleted");
                throw new InvalidDestinationException("Cannot send to a deleted temporary destination");
            }
        }
    }

    private void checkPreConditions() throws JMSException
    {
        checkNotClosed();

        if ((_session == null) || _session.isClosed())
        {
            throw new javax.jms.IllegalStateException("Invalid Session");
        }
        if(_session.getAMQConnection().isClosed())
        {
            throw new javax.jms.IllegalStateException("Connection closed");
        }
    }

    private void checkInitialDestination() throws JMSException
    {
        if (_destination == null)
        {
            throw new UnsupportedOperationException("Destination is null");
        }
        checkValidQueue();
    }

    private void checkDestination(Destination suppliedDestination) throws JMSException
    {
        if ((_destination != null) && (suppliedDestination != null))
        {
            throw new UnsupportedOperationException(
                    "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
        }

        if(suppliedDestination instanceof AMQQueue)
        {
            AMQQueue destination = (AMQQueue) suppliedDestination;
            checkValidQueue(destination);
        }
        if (suppliedDestination == null)
        {
            throw new InvalidDestinationException("Supplied Destination was invalid");
        }

    }

    private void checkValidQueue() throws JMSException
    {
        if(_destination instanceof AMQQueue)
        {
            checkValidQueue((AMQQueue) _destination);
        }
    }

    private void checkValidQueue(AMQQueue destination) throws JMSException
    {
        if (!destination.isCheckedForQueueBinding() && validateQueueOnSend())
        {
            if (getSession().isStrictAMQP())
            {
                getLogger().warn("AMQP does not support destination validation before publish");
                destination.setCheckedForQueueBinding(true);
            }
            else
            {
                if (isBound(destination))
                {
                    destination.setCheckedForQueueBinding(true);
                }
                else
                {
                    throw new InvalidDestinationException("Queue: " + destination.getQueueName()
                        + " is not a valid destination (no binding on server)");
                }
            }
        }
    }

    private boolean validateQueueOnSend()
    {
        return _connection.validateQueueOnSend();
    }

    /**
     * The session used to create this producer
     */
    public AMQSession getSession()
    {
        return _session;
    }

    public boolean isBound(AMQDestination destination) throws JMSException
    {
        try
        {
            return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
        }
        catch (TransportException e)
        {
            throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
        }
    }

    /**
     * If true, messages will not get a timestamp.
     */
    protected boolean isDisableTimestamps()
    {
        return _disableTimestamps;
    }

    protected void setDisableTimestamps(boolean disableTimestamps)
    {
        _disableTimestamps = disableTimestamps;
    }

    protected void setDestination(AMQDestination destination)
    {
        _destination = destination;
    }

    protected int getChannelId()
    {
        return _channelId;
    }

    protected void setChannelId(int channelId)
    {
        _channelId = channelId;
    }

    protected void setSession(AMQSession session)
    {
        _session = session;
    }

    protected String getUserID()
    {
        return _userID;
    }

    protected void setUserID(String userID)
    {
        _userID = userID;
    }

    protected PublishMode getPublishMode()
    {
        return _publishMode;
    }

    protected void setPublishMode(PublishMode publishMode)
    {
        _publishMode = publishMode;
    }

    public long getDeliveryDelay()
    {
        return _deliveryDelay;
    }

    @Override
    public void setDeliveryDelay(final long deliveryDelay)
    {
        _deliveryDelay = deliveryDelay;
    }

    Logger getLogger()
    {
        return _logger;
    }

}
